package com.linys.scala.qf.day06_spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AggregateRdd {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("Aggregate").setMaster("local[*]")
    val sc = new SparkContext(conf)
    /* Action 算子*/
    //集合函数
    val rdd1 = sc.parallelize(List(2, 1, 3, 6, 5), 3)
    val rdd1_1 = rdd1.reduce(_ + _)
    println(rdd1_1)
    //以数组的形式返回数据集的所有元素
    println(rdd1.collect().toBuffer)
    //返回RDD的元素个数
    println(rdd1.count())
    //取出对应数量的值 默认降序, 若输入0 会返回一个空数组
    println(rdd1.top(3).toBuffer)
    //顺序取出对应数量的值
    println(rdd1.take(3).toBuffer)
    //顺序取出对应数量的值 默认生序
    println(rdd1.takeOrdered(3).toBuffer)
    //获取第一个值 等价于 take(1)
    println(rdd1.first())
    //将处理过后的数据写成文件(存储在HDFS或本地文件系统)
    //rdd1.saveAsTextFile("dir/file1")
    //统计key的个数并生成map k是key名 v是key的个数
    val rdd2 = sc.parallelize(List(("key1", 2), ("key2", 1), ("key3", 3), ("key4", 6), ("key5", 5)), 2)
    val rdd2_1: collection.Map[String, Long] = rdd2.countByKey()
    println(rdd2_1)
    //遍历数据
    rdd1.foreach(x => println(x))

    /*其他算子*/
    //统计value的个数 但是会将集合中的一个元素看做是一个vluae
    val value: collection.Map[(String, Int), Long] = rdd2.countByValue
    println(value)
    //filterByRange:对RDD中的元素进行过滤,返回指定范围内的数据
    val rdd3 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
    val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c", "e") //包括开始和结束的
    println(rdd3_1.collect.toList)
    //flatMapValues对参数进行扁平化操作,是value的值
    val rdd3_2 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
    println(rdd3_2.flatMapValues(_.split(" ")).collect.toList)
    //foreachPartition 循环的是分区数据
    // foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储
    val rdd4 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
    rdd4.foreachPartition(x => println(x.reduce(_ + _)))
    //keyBy 以传入的函数返回值作为key ,RDD中的元素为value 新的元组
    val rdd5 = sc.parallelize(List("dog", "cat", "pig", "wolf", "bee"), 3)
    val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length)
    println(rdd5_1.collect.toList)
    //keys获取所有的key  values 获取所有的values
    println(rdd5_1.keys.collect.toList)
    println(rdd5_1.values.collect.toList)
    //collectAsMap  将需要的二元组转换成Map
    val map: collection.Map[String, Int] = rdd2.collectAsMap()
    println(map)

  }

}
